package day05;

import beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Session;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Flink Table API 与 SQL —— Window聚合
 * <p>
 * Window聚合窗口函数阿里云文档：https://help.aliyun.com/document_detail/62510.html
 *
 * @author lvbingbing
 * @date 2022-01-20 19:12
 */
public class FlinkTableApi07 {
    public static void main(String[] args) throws Exception {
        // 1、获取可执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int parallelism = 1;
        env.setParallelism(parallelism);
        // 2、设置事件时间属性
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 3、分组窗口
        studyGroupWindow(env);
        // 4、触发程序执行
        env.execute();
    }

    /**
     * Window聚合
     *
     * @param env 可执行环境
     */
    private static void studyGroupWindow(StreamExecutionEnvironment env) {
        // 1、从文件中读取数据，并指定事件时间时间戳 和 watermark
        DataStream<SensorReading> dataStreamSource = env.readTextFile("input/sensor.txt")
                .map(e -> {
                    String[] split = e.split(",");
                    return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {

                    private static final long serialVersionUID = 861425454126517843L;

                    @Override
                    public long extractTimestamp(SensorReading sensorReading) {
                        return sensorReading.getTimestamp() * 1000L;
                    }
                });
        // 2、创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 3、从数据流中获取Table，并创建临时视图
        Table dataStreamTable = tableEnv.fromDataStream(dataStreamSource, "id, timestamp as ts, temperature as temp, rt.rowtime");
        tableEnv.createTemporaryView("sensor", dataStreamTable);
        // 4、分组窗口
        // 4.1、滚动窗口
         tumbleWindow(dataStreamTable, tableEnv);
        // 4.2、滑动窗口
         hopWindow(dataStreamTable, tableEnv);
        // 4.3、会话窗口
        sessionWindow(dataStreamTable, tableEnv);
    }

    /**
     * 滚动窗口。阿里云文档：https://help.aliyun.com/document_detail/62511.html
     *
     * @param dataStreamTable 数据流中获取到的Table
     * @param tableEnv        表环境
     */
    private static void tumbleWindow(Table dataStreamTable, StreamTableEnvironment tableEnv) {
        // 1、tableApi
        Table resultTable = dataStreamTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
                .groupBy("id, tw")
                .select("id, id.count, temp.avg, tw.start, tw.end");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(resultTable, Row.class);
        rowDataStream.print("tumbleDs");
        // 2、sql
        String sql = "select id, count(id) as cnt, avg(temp) as avgTemp, tumble_start(rt, interval '10' second),tumble_end(rt, interval '10' second)" +
                "from sensor group by id, tumble(rt, interval '10' second)";
        Table sqlQueryTable = tableEnv.sqlQuery(sql);
        DataStream<Row> rowDataStream1 = tableEnv.toAppendStream(sqlQueryTable, Row.class);
        rowDataStream1.print("tumbleSqlDs");
    }

    /**
     * 滑动窗口。阿里云文档：https://help.aliyun.com/document_detail/62512.html
     *
     * @param dataStreamTable 数据流中获取到的Table
     * @param tableEnv        表环境
     */
    private static void hopWindow(Table dataStreamTable, StreamTableEnvironment tableEnv) {
        // 1、tableApi
        Table resultTable = dataStreamTable.window(Slide.over("10.seconds").every("2.seconds").on("rt").as("hw"))
                .groupBy("id, hw")
                .select("id, id.count, temp.avg, hw.start, hw.end");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(resultTable, Row.class);
        rowDataStream.print("slidingDs");
        // 2、sql
        String sql = "select id, count(id) as cnt, avg(temp) as avgTemp, hop_start(rt, interval '2' second, interval '10' second) , hop_end(rt, interval '2' second, interval '10' second)" +
                "from sensor group by id, hop(rt, interval '2' second, interval '10' second)";
        Table sqlQueryTable = tableEnv.sqlQuery(sql);
        DataStream<Row> rowDataStream1 = tableEnv.toAppendStream(sqlQueryTable, Row.class);
        rowDataStream1.print("slidingSqlDs");
    }

    /**
     * 会话窗口。阿里云文档：https://help.aliyun.com/document_detail/62513.html
     *
     * @param dataStreamTable 数据流中获取到的Table
     * @param tableEnv        表环境
     */
    private static void sessionWindow(Table dataStreamTable, StreamTableEnvironment tableEnv) {
        // 1、tableApi
        Table resultTable = dataStreamTable.window(Session.withGap("30.seconds").on("rt").as("sw"))
                .groupBy("id, sw")
                .select("id, id.count, temp.avg, sw.start, sw.end");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(resultTable, Row.class);
        rowDataStream.print("sessionDs");
        // 2、sql
        String sql = "select id, count(id) as cnt, avg(temp) as avgTemp, session_start(rt, interval '30' second) ,session_end(rt, interval '30' second)" +
                "from sensor group by id, session(rt, interval '30' second)";
        Table sqlQueryTable = tableEnv.sqlQuery(sql);
        DataStream<Row> rowDataStream1 = tableEnv.toAppendStream(sqlQueryTable, Row.class);
        rowDataStream1.print("sessionSqlDs");
    }
}